// Copyright (C) INFINI Labs & INFINI LIMITED.
//
// The INFINI Framework is offered under the GNU Affero General Public License v3.0
// and as commercial software.
//
// For commercial licensing, contact us at:
//   - Website: infinilabs.com
//   - Email: hello@infini.ltd
//
// Open Source licensed under AGPL V3:
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.

/* Copyright © INFINI LTD. All rights reserved.
 * Web: https://infinilabs.com
 * Email: hello#infini.ltd */

package rewrite_to_bulk

import (
	"fmt"
	log "github.com/cihub/seelog"
	"github.com/savsgio/gotils/bytes"
	"infini.sh/framework/core/config"
	"infini.sh/framework/core/errors"
	"infini.sh/framework/core/global"
	"infini.sh/framework/core/pipeline"
	"infini.sh/framework/core/util"
	"infini.sh/framework/lib/bytebufferpool"
	"infini.sh/framework/lib/fasthttp"
	"strings"
)

type RewriteToBulk struct {
	Prefix            string `config:"prefix" `
	AutoGenerateDocID bool   `config:"auto_generate_doc_id" `
	RemovedType       bool   `config:"type_removed" `
	ValidateBody      bool   `config:"validate_body" `
}

func init() {
	pipeline.RegisterFilterPluginWithConfigMetadata("rewrite_to_bulk", New, &RewriteToBulk{})
}

func New(c *config.Config) (pipeline.Filter, error) {

	runner := RewriteToBulk{
		ValidateBody:      true,
		AutoGenerateDocID: true,
	}

	if err := c.Unpack(&runner); err != nil {
		return nil, fmt.Errorf("failed to unpack the filter configuration : %s", err)
	}

	return &runner, nil
}

func (filter *RewriteToBulk) Name() string {
	return "rewrite_to_bulk"
}

func (filter *RewriteToBulk) Filter(ctx *fasthttp.RequestCtx) {

	path := string(ctx.PhantomURI().Path())
	valid, indexPath, typePath, idPath := ParseURLMeta(path)
	if global.Env().IsDebug {
		log.Debugf("rewrite_to_bulk: %v => %v, %v, %v, %v", path, valid, indexPath, typePath, idPath)
	}
	if valid {

		routing := ctx.PhantomURI().QueryArgs().Peek("routing")
		pipeline := ctx.PhantomURI().QueryArgs().Peek("pipeline")
		versionType := ctx.PhantomURI().QueryArgs().Peek("version_type")
		version := ctx.PhantomURI().QueryArgs().Peek("version")
		contentEncoding := string(ctx.Request.Header.PeekAny([]string{fasthttp.HeaderContentEncoding, fasthttp.HeaderContentEncoding2}))

		action := "index"
		if typePath == "_update" {
			action = "update"
			typePath = ""
		} else if typePath == "_doc,_update" {
			action = "update"
			typePath = "_doc"
		} else if typePath == "_create" {
			action = "create"
			typePath = ""
		} else if typePath == "_delete" || ctx.IsDelete() {
			action = "delete"
			typePath = ""
		}

		if idPath == "" && filter.AutoGenerateDocID {
			idPath = util.GetUUID()
			if filter.Prefix != "" {
				idPath = filter.Prefix + idPath
			}
			ctx.Request.Header.Set("X-Generated-ID", idPath)
			ctx.Response.Header.Set("X-Generated-ID", idPath)
		}

		//url
		uri := ctx.Request.CloneURI()
		uri.SetPath("/_bulk")
		ctx.Request.SetURI(uri)
		fasthttp.ReleaseURI(uri)

		docBuf := bytebufferpool.Get("rewrite_to_bulk")
		docBuf.Reset()
		defer bytebufferpool.Put("rewrite_to_bulk", docBuf)

		//write index part
		if indexPath == "" {
			panic("index can't be nil")
		}

		docBuf.WriteString(fmt.Sprintf("{ \"%v\" : { \"_index\" : \"%s\" ", action, indexPath))
		//write type part
		if typePath != "" && !filter.RemovedType {
			docBuf.WriteString(fmt.Sprintf(", \"_type\" : \"%s\" ", typePath))
		}
		//write id part
		if idPath != "" {
			docBuf.WriteString(fmt.Sprintf(", \"_id\" : \"%s\" ", idPath))
		}

		if routing != nil {
			docBuf.WriteString(fmt.Sprintf(", \"routing\" : \"%s\" ", string(routing)))
		}

		if versionType != nil {
			docBuf.WriteString(fmt.Sprintf(", \"version_type\" : \"%s\" ", string(versionType)))
		}

		if version != nil {
			docBuf.WriteString(fmt.Sprintf(", \"version\" : \"%s\" ", string(version)))
		}

		if pipeline != nil {
			docBuf.WriteString(fmt.Sprintf(", \"pipeline\" : \"%s\" ", string(pipeline)))
		}

		//write final part
		docBuf.WriteString("} }\n")

		if action != "delete" {
			var body []byte
			var err error

			if contentEncoding == "gzip" {
				body, err = ctx.Request.BodyGunzip()
				if err != nil {
					panic(err)
				}
			} else {
				body = ctx.Request.Body()
			}

			if len(body) <= 0 {
				panic("invalid body for such bulk request, should be a valid json")
			}

			if filter.ValidateBody {
				obj := util.MapStr{}
				util.MustFromJSONBytes(body, &obj)
			}

			util.WalkBytesAndReplace(body, util.NEWLINE, util.SPACE)
			docBuf.Write(bytes.Copy(body))
		}

		docBuf.WriteString("\n")

		if contentEncoding == "gzip" {
			ctx.Request.ResetBody()
			_, err := fasthttp.WriteGzipLevel(ctx.Request.BodyWriter(), bytes.Copy(docBuf.Bytes()), fasthttp.CompressBestCompression)
			if err != nil {
				panic(errors.Errorf("failed to compress message: %v", err))
			}
		} else {
			ctx.Request.SetBody(bytes.Copy(docBuf.Bytes()))
		}
	}
}

func ParseURLMeta(pathStr string) (valid bool, urlLevelIndex, urlLevelType, id string) {

	if strings.Index(pathStr, "//") >= 0 {
		pathStr = strings.ReplaceAll(pathStr, "//", "/")
	}

	if strings.LastIndex(pathStr, "/") == 0 {
		return false, urlLevelIndex, urlLevelType, id
	}

	if util.SuffixStr(pathStr, "/") {
		pathStr = util.TrimRightStr(pathStr, "/")
	}

	pathArray := strings.Split(pathStr, "/")

	last := pathArray[len(pathArray)-1]

	//only _doc and _create are valid for create new doc
	if util.PrefixStr(last, "_") && !util.ContainsAnyInArray(last, []string{"_create", "_doc", "_update"}) {
		return false, urlLevelIndex, urlLevelType, id
	}

	switch len(pathArray) {
	case 5:
		urlLevelIndex = pathArray[1]
		urlLevelType = pathArray[2]
		id = pathArray[3]
		if pathArray[4] == "_update" && pathArray[2] == "_doc" {
			urlLevelType = fmt.Sprintf("%s,%s", pathArray[2], pathArray[4])
		}
		break
	case 4:
		urlLevelIndex = pathArray[1]
		urlLevelType = pathArray[2]
		id = pathArray[3]
		break
	case 3:
		urlLevelIndex = pathArray[1]
		urlLevelType = pathArray[2]
		break
	case 2:
		urlLevelIndex = pathArray[1]
		return false, urlLevelIndex, urlLevelType, id
	}

	if util.SuffixStr(urlLevelIndex, "_") {
		return false, urlLevelIndex, urlLevelType, id
	}

	return true, urlLevelIndex, urlLevelType, id
}
